 1.Flink CEP之基础
   
   CEP 即Complex Event Processing - 复杂事件处理，Flink CEP 是在 Flink 中实现的复杂时间处理(CEP)
库。处理事件的规则，被叫做“模式”(Pattern)，Flink CEP 提供了 Pattern API，用于对输入流数据进行
复杂事件规则定义，用来提取符合规则的事件序列。
   Pattern API 大致分为三种：个体模式，组合模式，模式组。
   Flink CEP 应用场景：
   CEP 在互联网各个行业都有应用，例如金融、物流、电商、智能交通、物联网行业等行业：
   实时监控：
   在网站的访问日志中寻找那些使用脚本或者工具“爆破”登录的用户；
   我们需要在大量的订单交易中发现那些虚假交易（超时未支付）或发现交易活跃用户；
   或者在快递运输中发现那些滞留很久没有签收的包裹等。
   风险控制：
   比如金融行业可以用来进行风险控制和欺诈识别，从交易信息中寻找那些可能存在的危险交易和非法交
易。
   营销广告：
   跟踪用户的实时行为，指定对应的推广策略进行推送，提高广告的转化率。
   
   1).定义
   2).特征
   CEP的特征如下：
   目标：从有序的简单事件流中发现一些高阶特征；
   输入：一个或多个简单事件构成的事件流；
   处理：识别简单事件之间的内在联系，多个符合一定规则的简单事件构成复杂事件；
   输出：满足规则的复杂事件。
   3).功能
   CEP用于分析低延迟、频繁产生的不同来源的事件流。CEP可以帮助在复杂的、不相关的时间流中找
出有意义的模式和复杂的关系，以接近实时或准实时的获得通知或组织一些行为。
   EP支持在流上进行模式匹配，根据模式的条件不同，分为连续的条件或不连续的条件；模式的条件
允许有时间的限制，当条件范围内没有达到满足的条件时，会导致模式匹配超时。
   看起来很简单，但是它有很多不同的功能：
   ① 输入的流数据，尽快产生结果；
   ② 在2个事件流上，基于时间进行聚合类的计算；
   ③ 提供实时/准实时的警告和通知；
   ④ 在多样的数据源中产生关联分析模式；
   ⑤ 高吞吐、低延迟的处理
   市场上有多种CEP的解决方案，例如Spark、Samza、Beam等，但他们都没有提供专门的库支持。然
而，Flink提供了专门的CEP库。
   4).主要组件
   Flink为CEP提供了专门的Flink CEP library，它包含如下组件：Event Stream、Pattern定义、Pattern
检测和生成Alert。
   首先，开发人员要在DataStream流上定义出模式条件，之后Flink CEP引擎进行模式检测，必要时生成
警告。

 2.Pattern API
   
   处理事件的规则，被叫作模式（Pattern）。
   Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则定义，用来提取符合规则的事件序
列。
   模式大致分为三类：
   ① 个体模式（Individual Patterns）
   组成复杂规则的每一个单独的模式定义，就是个体模式。
   start.times(3).where(_.behavior.startsWith(‘fav’))
   ② 组合模式（Combining Patterns，也叫模式序列）
   很多个体模式组合起来，就形成了整个的模式序列。
   模式序列必须以一个初始模式开始：
   val start = Pattern.begin(‘start’)
   ③ 模式组（Group of Pattern）
   将一个模式序列作为条件嵌套在个体模式里，成为一组模式。
   1).个体模式
   个体模式包括单例模式和循环模式。单例模式只接收一个事件，而循环模式可以接收多个事件。
   (1).量词
   可以在一个个体模式后追加量词，也就是指定循环次数。
// 匹配出现4次
start.time(4)
// 匹配出现0次或4次
start.time(4).optional
// 匹配出现2、3或4次
start.time(2,4)
// 匹配出现2、3或4次，并且尽可能多地重复匹配
start.time(2,4).greedy
// 匹配出现1次或多次
start.oneOrMore
// 匹配出现0、2或多次，并且尽可能多地重复匹配
start.timesOrMore(2).optional.greed
   (2).条件
   每个模式都需要指定触发条件，作为模式是否接受事件进入的判断依据。CEP中的个体模式主要通过
调用.where()、.or()和.until()来指定条件。按不同的调用方式，可以分成以下几类：
   ① 简单条件
   通过.where()方法对事件中的字段进行判断筛选，决定是否接收该事件
   start.where(event=>event.getName.startsWith(“foo”))
   ② 组合条件
   将简单的条件进行合并；or()方法表示或逻辑相连，where的直接组合就相当于与and。
   Pattern.where(event => …/*some condition*/).or(event => /*or condition*/)
   ③ 终止条件
   如果使用了oneOrMore或者oneOrMore.optional，建议使用.until()作为终止条件，以便清理状态。
   ④ 迭代条件
   能够对模式之前所有接收的事件进行处理；调用.where((value,ctx) => {…})，可以调用
ctx.getEventForPattern(“name”)
   2).模式序列
   不同的近邻模式如下图：
   (1).严格近邻
   所有事件按照严格的顺序出现，中间没有任何不匹配的事件，由.next()指定。例如对于模式“a next
b”，事件序列“a,c,b1,b2”没有匹配。
   (2).宽松近邻
   允许中间出现不匹配的事件，由.followedBy()指定。例如对于模式“a followedBy b”，事件序列
“a,c,b1,b2”匹配为{a,b1}。
   (3).非确定性宽松近邻
   进一步放宽条件，之前已经匹配过的事件也可以再次使用，由.followedByAny()指定。例如对于模式
“a followedByAny b”，事件序列“a,c,b1,b2”匹配为{ab1}，{a,b2}。
   除了以上模式序列外，还可以定义“不希望出现某种近邻关系”：
   .notNext()：不想让某个事件严格紧邻前一个事件发生。
   .notFollowedBy()：不想让某个事件在两个事件之间发生。
   需要注意：
   ①所有模式序列必须以.begin()开始；
   ②模式序列不能以.notFollowedBy()结束；
   ③“not”类型的模式不能被optional所修饰；
   ④可以为模式指定时间约束，用来要求在多长时间内匹配有效。
   next.within(Time.seconds(10))
   3).模式的检测
   指定要查找的模式序列后，就可以将其应用于输入流以检测潜在匹配。调用CEP.pattern()，给定输入
流和模式，就能得到一个PatternStream。
   val input:DataStream[Event] = …
   val pattern:Pattern[Event,_] = …
   val patternStream:PatternStream[Event]=CEP.pattern(input,pattern)
   4).匹配事件的提取
   创建PatternStream之后，就可以应用select或者flatSelect方法，从检测到的事件序列中提取事件
了。 
   select()方法需要输入一个select function作为参数，每个成功匹配的事件序列都会调用它。
   select()以一个Map[String,Iterable[IN]]来接收匹配到的事件序列，其中key就是每个模式的名称，而
value就是所有接收到的事件的Iterable类型。
	def selectFn(pattern : Map[String,Iterable[IN]]):OUT={
 		val startEvent = pattern.get(“start”).get.next
 		val endEvent = pattern.get(“end”).get.next
 		OUT(startEvent, endEvent)
	}
   flatSelect通过实现PatternFlatSelectFunction实现与select相似的功能。唯一的区别就是flatSelect方
法可以返回多条记录，它通过一个Collector[OUT]类型的参数来将要输出的数据传递到下游。
   process
   select	
   5).超时事件的提取
   当一个模式通过within关键字定义了检测窗口时间时，部分事件序列可能因为超过窗口长度而被丢
弃；为了能够处理这些超时的部分匹配，select和flatSelect API调用允许指定超时处理程序。
   Flink CEP 开发流程：
      1. DataSource 中的数据转换为 DataStream；
      2. 定义 Pattern，并将 DataStream 和 Pattern 组合转换为 PatternStream；
      3. PatternStream 经过 select、process 等算子转换为 DataStraem；
      4. 再次转换的 DataStream 经过处理后，sink 到目标库。
   select方法：
	SingleOutputStreamOperator<PayEvent> result =
	patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent,
PayEvent>() {
  		@Override
  		public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws
Exception {
    		return map.get("begin").get(0);
 		}
}, new PatternSelectFunction<PayEvent, PayEvent>() {
  		@Override
  		public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {
    		return map.get("pay").get(0);
		}
	})	
   
   对检测到的模式序列应用选择函数。对于每个模式序列，调用提供的{@link PatternSelectFunction}。
模式选择函数只能产生一个结果元素。
   对超时的部分模式序列应用超时函数。对于每个部分模式序列，调用提供的{@link
PatternTimeoutFunction}。模式超时函数只能产生一个结果元素。
   您可以在使用相同的{@link OutputTag}进行select操作的{@link SingleOutputStreamOperator}上获
得由{@link SingleOutputStreamOperator}生成的{@link SingleOutputStreamOperator}生成的超时
数据流。
   @param timedOutPartialMatchesTag 标识端输出超时模式的@link OutputTag}
   @param patternTimeoutFunction 为超时的每个部分模式序列调用的模式超时函数。
   @param patternSelectFunction 为每个检测到的模式序列调用的模式选择函数。
   @param 产生的超时元素的类型
   @param 结果元素的类型
   return {@link DataStream}，其中包含产生的元素和在边输出中产生的超时元素。 
   DataStream<PayEvent> sideOutput = result.getSideOutput(orderTimeoutOutput);
   获取{@link DataStream}，该{@link DataStream}包含由操作发出到指定{@link OutputTag}的边输出
的元素。

 3.NFA：非确定有限自动机
   
   FlinkCEP在运行时会将用户的逻辑转化成这样的一个NFA Graph (nfa对象)
   所以有限状态机的工作过程，就是从开始状态，根据不同的输入，自动进行状态转换的过程。
   上图中的状态机的功能，是检测二进制数是否含有偶数个 0. 从图上可以看出，输入只有 1 和 0 两种。
从 S1 状态开始，只有输入 0 才会转换到 S2 状态，同样 S2 状态下只有输入 0 才会转换到 S1。所以，
二进制数输入完毕，如果满足最终状态，也就是最后停在 S1 状态，那么输入的二进制数就含有偶数个
0。

 4.案例
   
   Flink CEP 开发流程：
       1. DataSource 中的数据转换为 DataStream；watermark、keyby
       2. 定义 Pattern，并将 DataStream 和 Pattern 组合转换为 PatternStream；
       3. PatternStream 经过 select、process 等算子转换为 DataStream；
       4. 再次转换的 DataStream 经过处理后，sink 到目标库。
   案例1：恶意登录检测
   需求：找出5秒内，连续登录失败的账号
   思路：
   1、 数据源
			new CEPLoginBean(1L, "fail", 1597905234000L),
			new CEPLoginBean(1L, "success", 1597905235000L),
			new CEPLoginBean(2L, "fail", 1597905236000L),
			new CEPLoginBean(2L, "fail", 1597905237000L),
			new CEPLoginBean(2L, "fail", 1597905238000L),
			new CEPLoginBean(3L, "fail", 1597905239000L),
			new CEPLoginBean(3L, "success", 1597905240000L)
   2、 在数据源上做出watermark
   3、 在watermark上根据id分组keyby
   4、 做出模式pattern  
	Pattern<CEPLoginBean, CEPLoginBean> pattern = Pattern.
	<CEPLoginBean>begin("start").where(new IterativeCondition<CEPLoginBean>() {
      	@Override
      	public boolean filter(CEPLoginBean value, Context<CEPLoginBean> ctx)
throws Exception {
        	return value.getLogresult().equals("fail");
     	}
   })
       		.next("next").where(new IterativeCondition<CEPLoginBean>() {
          		@Override
          		public boolean filter(CEPLoginBean value,
Context<CEPLoginBean> ctx) throws Exception {
            		return value.getLogresult().equals("fail");
         		}
       		})
       		.within(Time.seconds(5));
   5、 在数据流上进行模式匹配
   6、 提取匹配成功的数据
   代码：
   依赖：
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>1.11.1</version>
        </dependency>

package com.lagou.mycep;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

public class LoginDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        //1、 数据源
        DataStreamSource<LoginBean> data = env.fromElements(
                new LoginBean(1L, "fail", 1597905234000L),
                new LoginBean(1L, "success", 1597905235000L),
                new LoginBean(2L, "fail", 1597905236000L),
                new LoginBean(2L, "fail", 1597905237000L),
                new LoginBean(2L, "fail", 1597905238000L),
                new LoginBean(3L, "fail", 1597905239000L),
                new LoginBean(3L, "success", 1597905240000L)
        );
        //2、 在数据源上做出watermark
        SingleOutputStreamOperator<LoginBean> watermarks = data.assignTimestampsAndWatermarks(new WatermarkStrategy<LoginBean>() {
            @Override
            public WatermarkGenerator<LoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<LoginBean>() {
                    long maxTimeStamp = Long.MIN_VALUE;

                    @Override
                    public void onEvent(LoginBean event, long eventTimestamp, WatermarkOutput output) {
                        maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
                    }

                    long maxOutOfOderness = 5000l;

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(new Watermark(maxOutOfOderness));
                    }
                };
            }
        }.withTimestampAssigner((element, recordTimestamp) -> element.getTs()));
        //3、 在watermark上根据id分组keyby
        KeyedStream<LoginBean, Long> keyed = watermarks.keyBy(value -> value.getId());
        //4、 做出模式pattern
        Pattern<LoginBean, LoginBean> pattern = Pattern.<LoginBean>begin("start").where(new IterativeCondition<LoginBean>() {
            @Override
            public boolean filter(LoginBean value, Context<LoginBean> context) throws Exception {
                return value.getState().equals("fail");
            }
        })
                .next("next").where(new IterativeCondition<LoginBean>() {
                    @Override
                    public boolean filter(LoginBean value, Context<LoginBean> context) throws Exception {
                        return value.getState().equals("fail");
                    }
                })
                .within(Time.seconds(5));
        //5、 在数据流上进行模式匹配
        PatternStream<LoginBean> patternStream = CEP.pattern(keyed, pattern);
        //6、 提取匹配成功的数据
        SingleOutputStreamOperator<Long> result = patternStream.process(new PatternProcessFunction<LoginBean, Long>() {
            @Override
            public void processMatch(Map<String, List<LoginBean>> match, Context ctx, Collector<Long> out) throws Exception {
//                System.out.println(match);
                out.collect(match.get("start").get(0).getId());

            }
        });
        result.print();
        env.execute();

    }
}


   案例2：检测交易活跃用户
   需求：找出24小时内，至少5次有效交易的用户：
   思路：
   1、 数据源：
				new ActiveUserBean("100XX", 0.0D, 1597905234000L),
				new ActiveUserBean("100XX", 100.0D, 1597905235000L),
				new ActiveUserBean("100XX", 200.0D, 1597905236000L),
				new ActiveUserBean("100XX", 300.0D, 1597905237000L),
				new ActiveUserBean("100XX", 400.0D, 1597905238000L),
				new ActiveUserBean("100XX", 500.0D, 1597905239000L),
				new ActiveUserBean("101XX", 0.0D, 1597905240000L),
				new ActiveUserBean("101XX", 100.0D, 1597905241000L)
   2、 watermark转化
   3、 keyby转化
   4、 做出pattern
   至少5次：timesOrMore(5)
   24 小时之内：within(Time.hours(24))
	Pattern<ActiveUserBean, ActiveUserBean> pattern = Pattern.
<ActiveUserBean>begin("start").where(new SimpleCondition<ActiveUserBean>() {
     	@Override
     	public boolean filter(ActiveUserBean value) throws Exception {
       		return value.getMoney() > 0;
     	}
   	}).timesOrMore(5).within(Time.hours(24));;
   5、 模式匹配
   6、 提取匹配成功数据
   代码：
package com.lagou.mycep;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Map;

public class DauDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<UserBean> data = env.fromElements(
                new UserBean("100XX", 0.0D, 1597905234000L),
                new UserBean("100XX", 100.0D, 1597905235000L),
                new UserBean("100XX", 200.0D, 1597905236000L),
                new UserBean("100XX", 300.0D, 1597905237000L),
                new UserBean("100XX", 400.0D, 1597905238000L),
                new UserBean("100XX", 500.0D, 1597905239000L),
                new UserBean("101XX", 0.0D, 1597905240000L),
                new UserBean("101XX", 100.0D, 1597905241000L)

        );

        SingleOutputStreamOperator<UserBean> watermarks = data.assignTimestampsAndWatermarks(new WatermarkStrategy<UserBean>() {
            @Override
            public WatermarkGenerator<UserBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<UserBean>() {
                    long maxTimeStamp = Long.MIN_VALUE;
                    long maxOutOfOrderness = 5000l;

                    @Override
                    public void onEvent(UserBean event, long eventTimestamp, WatermarkOutput output) {
                        maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));

                    }
                };
            }
        }.withTimestampAssigner((element, recordTimestamp) -> element.getTs()));
        KeyedStream<UserBean, String> keyed = watermarks.keyBy(value -> value.getUid());

        //做出pattren
        Pattern<UserBean, UserBean> pattern = Pattern.<UserBean>begin("begin").where(new IterativeCondition<UserBean>() {
            @Override
            public boolean filter(UserBean value, Context<UserBean> ctx) throws Exception {
                return value.getMoney() > 0;
            }
        })
                .timesOrMore(5)
                .within(Time.hours(24));
        PatternStream<UserBean> patternStream = CEP.pattern(keyed, pattern);

        SingleOutputStreamOperator<String> result = patternStream.process(new PatternProcessFunction<UserBean, String>() {
            @Override
            public void processMatch(Map<String, List<UserBean>> match, Context ctx, Collector<String> out) throws Exception {
                out.collect(match.get("begin").get(0).getUid());
            }
        });
        result.print();
        env.execute();
    }
}


   案例3：超时未支付
   需求：找出下单后10分钟没有支付的订单
   思路：
   1、 数据源：
				new PayEvent(1L, "create", 1597905234000L),
       			new PayEvent(1L, "pay", 1597905235000L),
       			new PayEvent(2L, "create", 1597905236000L),
       			new PayEvent(2L, "pay", 1597905237000L),
       			new PayEvent(3L, "create", 1597905239000L)
   2、 转化watermark
   3、 keyby转化
   4、 做出Pattern（下单以后10分钟内未支付）
   注意：下单为create 支付为pay ，create和pay之间不需要是严格临近，所以选择followedBy
	Pattern<PayEvent, PayEvent> pattern = Pattern.<PayEvent>
       		begin("begin")
       		.where(new IterativeCondition<PayEvent>() {
         		@Override
         		public boolean filter(PayEvent payEvent, Context context)
throws Exception {
           			return payEvent.getName().equals("create");
         		}
       		})
       		.followedBy("pay")
       		.where(new IterativeCondition<PayEvent>() {
         		@Override
         		public boolean filter(PayEvent payEvent, Context context)
throws Exception {
           			return payEvent.getName().equals("pay");
         		}
       		})
       		.within(Time.seconds(600));
   5、 模式匹配
   6、 取出匹配成功的数据
  (1).采用测输出的方式
	OutputTag<PayEvent> orderTimeoutOutput = new OutputTag<PayEvent>("orderTimeout"){};
  (2).采用select方法
	SingleOutputStreamOperator<PayEvent> result =
patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent,
PayEvent>() {
     		@Override
     		public PayEvent timeout(Map<String, List<PayEvent>> map, long l)
throws Exception {
       			return map.get("begin").get(0);
     		}
   		}, new PatternSelectFunction<PayEvent, PayEvent>() {
     		@Override
     		public PayEvent select(Map<String, List<PayEvent>> map) throws
Exception {
       			return map.get("pay").get(0);
     		}
   		});
   		//result.print();
   		DataStream<PayEvent> sideOutput =result.getSideOutput(orderTimeoutOutput);
   		sideOutput.print();

   代码:
package com.lagou.mycep;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.cep.*;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.OutputTag;

import java.util.List;
import java.util.Map;

public class PayDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        DataStreamSource<PayBean> data = env.fromElements(
                new PayBean(1L, "create", 1597905234000L),
                new PayBean(1L, "pay", 1597905235000L),
                new PayBean(2L, "create", 1597905236000L),
                new PayBean(2L, "pay", 1597905237000L),
                new PayBean(3L, "create", 1597905239000L)
        );
        SingleOutputStreamOperator<PayBean> watermarks = data.assignTimestampsAndWatermarks(new WatermarkStrategy<PayBean>() {
            @Override
            public WatermarkGenerator<PayBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                return new WatermarkGenerator<PayBean>() {
                    long maxTimeStamp = Long.MIN_VALUE;
                    long maxOutOfOrderness = 5000l;

                    @Override
                    public void onEvent(PayBean event, long eventTimestamp, WatermarkOutput output) {
                        maxTimeStamp = Math.max(maxTimeStamp, event.getTs());
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {
                        output.emitWatermark(new Watermark(maxTimeStamp - maxOutOfOrderness));
                    }
                };
            }
        }.withTimestampAssigner((element, recordTimestamp) -> element.getTs()));
        KeyedStream<PayBean, Long> keyed = watermarks.keyBy(value -> value.getId());

        Pattern<PayBean, PayBean> pattern = Pattern.<PayBean>begin("start").where(new IterativeCondition<PayBean>() {
            @Override
            public boolean filter(PayBean value, Context<PayBean> ctx) throws Exception {
                return value.getState().equals("create");
            }
        })
                .followedBy("next").where(new IterativeCondition<PayBean>() {
                    @Override
                    public boolean filter(PayBean value, Context<PayBean> context) throws Exception {
                        return value.getState().equals("pay");
                    }
                })
                .within(Time.seconds(600));
        PatternStream<PayBean> patternStream = CEP.pattern(keyed, pattern);
        OutputTag<PayBean> outoftime = new OutputTag<PayBean>("outoftime") {};
        SingleOutputStreamOperator<PayBean> result = patternStream.select(outoftime, new PatternTimeoutFunction<PayBean, PayBean>() {
            @Override
            public PayBean timeout(Map<String, List<PayBean>> pattern, long timeoutTimeStamp) throws Exception {
                return pattern.get("start").get(0);
            }
        }, new PatternSelectFunction<PayBean, PayBean>() {
            @Override
            public PayBean select(Map<String, List<PayBean>> pattern) throws Exception {
                return pattern.get("start").get(0);
            }
        });
        DataStream<PayBean> sideOutput = result.getSideOutput(outoftime);
        sideOutput.print();

        env.execute();
    }
}
